package com.dec.kks.etl.spark_stream_ch_kafka;

import com.dec.kks.etl.producer.DECConfigUtil;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function0;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka010.ConsumerStrategies;
import org.apache.spark.streaming.kafka010.KafkaUtils;
import org.apache.spark.streaming.kafka010.LocationStrategies;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Arrays;
import java.util.Collection;
import java.util.Map;

public class SparkSteamingKafkaCheckpointMain {

    private static final Logger LOGGER = LoggerFactory.getLogger(SparkSteamingKafkaCheckpointMain.class);


    static String checkpointPath = "/user/hdfs/checkpoint1";

    public static JavaStreamingContext createJSSC() throws Exception {
        System.setProperty("hadoop.home.dir", "/home/hdfs/bigdata/hadoop-2.7.4");

        SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("spark steaming kafka checkpoint");
        conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
        JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(1));

        Map<String, Object> kafkaParams = DECConfigUtil.loadConfig("/home/hdfs/soft/IdeaProjects/dec-kks-etl/src/main/resources", 0);
        Collection<String> topics = Arrays.asList("dec-rds-vibration-p1-s1-test");

        jssc.checkpoint(checkpointPath);

        JavaInputDStream<ConsumerRecord<String, String>> stream =
                KafkaUtils.createDirectStream(
                        jssc,
                        LocationStrategies.PreferConsistent(),
                        ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams)
                );

        stream.checkpoint(Duration.apply(10000));

        stream.foreachRDD(new VoidFunction<JavaRDD<ConsumerRecord<String, String>>>() {
            @Override
            public void call(JavaRDD<ConsumerRecord<String, String>> consumerRecordJavaRDD) throws Exception {
                consumerRecordJavaRDD.map(new Function<ConsumerRecord<String, String>, String>() {
                    @Override
                    public String call(ConsumerRecord<String, String> sscr) throws Exception {
                        System.out.println("kafka消息：->" + sscr.key() + "-:-" + sscr.value());
                        Thread.sleep(1000);
                        return sscr.value();
                    }
                });

            }
        });

        stream.map(new Function<ConsumerRecord<String,String>, String>() {
            @Override
            public String call(ConsumerRecord<String, String> sscr) throws Exception {
                LOGGER.error("打印："+sscr.value());
                return sscr.value();
            }
        }).print();

        return jssc;
    }

//    private static String stopDir;
//    private static String stopMark;
//
//    private static void listener(JavaStreamingContext JSSC) {
//        HdfsUtils.rm(stopDir);
//        HdfsUtils.mkdirs(stopDir);
//        // 创建单线程池调度器，每间隔 3 秒执行一次
//        ScheduledExecutorService p = Executors.newSingleThreadScheduledExecutor();
//        p.scheduleWithFixedDelay(() -> {
//            boolean isExist = HdfsUtils.exists(stopMark);
//            if (isExist) {
//                LOGGER.error("HDFS 上存在停止写入 流标志文件, 即将停止流......");
//                if (JSSC != null) {
//                    JSSC.stop(true, true);
//                    LOGGER.error("数据导入出错，即将退出！");
//                    System.exit(-1);
//                }
//            }
//        }, 5000, 3000, TimeUnit.MILLISECONDS);
//    }

    public static void main(String[] args) throws Exception {
        JavaStreamingContext jssc = JavaStreamingContext.getOrCreate(checkpointPath, new Function0<JavaStreamingContext>() {
            @Override
            public JavaStreamingContext call() throws Exception {
                return createJSSC();
            }
        });
        jssc.start();
        jssc.awaitTermination();
        jssc.close();
    }
}
